cache and checkpoint
cache
(orpersist
) is an important feature which does not exist in Hadoop. It makes Spark much faster to reuse a data set, e.g. iterative algorithm in machine learning, interactive data exploration, etc. Different from Hadoop MapReduce jobs, Spark's logical/physical plan can be very large, so the computing chain could be too long that it takes lots of time to compute RDD. If, unfortunately, some errors or exceptions occur during the execution of a task, the whole computing chain needs to be re-executed, which is considerably expensive. Therefore, we need tocheckpoint
some time-consuming RDDs. Thus, even if the following RDD goes wrong, it can continue with the data retrieved from checkpointed RDDs.
cache()
Let's take theGroupByTest
in chapter Overview as an example, theFlatMappedRDD
has been cached, so job 1 can just start withFlatMappedRDD
, sincecache()
makes the repeated data get shared by jobs of the same application.
Q: What kind of RDD needs to be cached ?
Those which will be repeatedly computed and are not too large.
Q: How to cache an RDD ?
Just do ardd.cache()
in driver program, whererdd
is the RDD accessible to users, e.g. RDD produced bytransformation()
, but some RDD produced by Spark (not user) during the execution of a transformation can not be cached by user, e.g.ShuffledRDD
,MapPartitionsRDD
duringreduceByKey()
, etc.
Q: How does Spark cache RDD ?
We can just make a guess. Intuitively, when a task gets the first record of an RDD, it will test if this RDD should be cached. If so, the first record and all the following records will be sent toblockManager
'smemoryStore
. IfmemoryStore
can not hold all the records,diskStore
will be used instead.
The implementation is similar to what we can guess, but the difference is thatSpark will test whether the RDD should be cached or not just before computing the first partition. If the RDD should be cached, the partition will be computed and cached into memory.cache
only uses memory. Writing to disk is calledcheckpoint
.
After callingrdd.cache()
,rdd
becomespersistRDD
whosestorageLevel
isMEMORY_ONLY
.persistRDD
will telldriver
that it needs to be persisted.
The above can be found in the following source code
rdd.iterator()
=
>
SparkEnv
.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
=
>
key
=
RDDBlockId
(rdd.id, split.index)
=
>
blockManager.get(key)
=
>
computedValues
=
rdd.computeOrReadCheckpoint(split, context)
if
(isCheckpointed) firstParent[
T
].iterator(split, context)
else
compute(split, context)
=
>
elements
=
new
ArrayBuffer
[
Any
]
=
>
elements
++
=
computedValues
=
>
updatedBlocks
=
blockManager.put(key, elements, tellMaster
=
true
)
Whenrdd.iterator()
is called to compute some partitions in therdd
, ablockId
will be used to indicate which partition to cache, whereblockId
is ofRDDBlockId
type which is different from other data types inmemoryStore
likeresult
oftask
. And then, partitions inblockManger
will be checked to see whether they are checkpointed. If so, it will say that the task has already been executed, no more computation is needed for this partition. Theelements
of typeArrayBuffer
will take all records of the partition from the check point. If not, the partition will be computed first, then all its records will be put intoelements
. Finally,elements
will be submitted toblockManager
for caching.
blockManager
saveselements
(partition) intoLinkedHashMap[BlockId, Entry]
insidememoryStore
. If partition is bigger thanmemoryStore
's capacity (60% heap size), then just return by saying not being able to hold the data. If the size is ok, it will then drop some RDD partitions which was cached earlier in order to create space for the new incoming partitions. If the created space is enough, the new partition will be put intoLinkedHashMap
; if not, return by saying not enough space again. It is worth mentioning that the old partitions which belong to the RDD of the new partitions will not be dropped. Ideally, "first cached, first dropped".
Q: How to read cached RDD ?
When a cached RDD is being recomputed (in next job),task
will readblockManager
directly frommemoryStore
. Specifically, during the computation of some RDD partitions (by callingrdd.iterator()
),blockManager
will be asked whether they are cached or not. If the partition is cached in local,blockManager.getLocal()
will be called to read data frommemoryStore
. If the partition was cached on the other nodes,blockManager.getRemote()
will be called. See below:
the storage location of cached partition:theblockManager
of the node on which a partition is cached will notify theblockManagerMasterActor
on master by saying that an RDD partition is cached. This information will be stored in theblockLocations: HashMap
ofblockMangerMasterActor
. When a task needs a cached RDD, it will sendblockManagerMaster.getLocations(blockId)
request to driver to get the partition's location, and the driver will lookupblockLocations
to send back location info.
Read cached partition from the other nodes:a task gets cached partition's location info, and then it sendsgetBlock(blockId)
request to the target node viaconnectionManager
. The target node retrieves and sends back the cached partition from thememoryStore
of the localblockManager
.
Checkpoint
Q: What kind of RDD needs checkpoint ?
- the computation takes a long time
- the computing chain is too long
- depends too many RDDs
Actually, saving the output ofShuffleMapTask
on local disk is alsocheckpoint
, but it is just for data output of partition.
Q: When to checkpoint ?
As mentioned above, every time a computed partition needs to be cached, it is cached into memory. However,checkpoint
does not follow the same principle. Instead, it waits until the end of a job, and launches another job to finishcheckpoint
.An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do ardd.cache()
beforerdd.checkpoint()
. In this case, the second job will not recompute the RDD. Instead, it will just read cache. In fact, Spark offersrdd.persist(StorageLevel.DISK_ONLY)
method, like caching on disk. Thus, it caches RDD on disk during its first computation, but this kind ofpersist
andcheckpoint
are different, we will discuss the difference later.
Q: How to implement checkpoint ?
Here is the procedure:
RDD will be: [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]. In the end, it will be checkpointed.
Initialized
On driver side, afterrdd.checkpoint()
is called, the RDD will be managed byRDDCheckpointData
. User should set the storage path for check point (on hdfs).
marked for checkpointing
After initialization,RDDCheckpointData
will mark RDDMarkedForCheckpoint
.
checkpointing in progress
When a job is finished,finalRdd.doCheckpoint()
will be called.finalRDD
scans the computing chain backward. When meeting an RDD which needs to be checkpointed, the RDD will be markedCheckpointingInProgress
, and then the configuration files (for writing to hdfs), like core-site.xml, will be broadcast toblockManager
of the other work nodes. After that, a job will be launched to finishcheckpoint
:
rdd.context.runJob(rdd,
CheckpointRDD
.writeToFile(path.toString, broadcastedConf))
checkpointed
After the job finishes checkpoint, it will clean all the dependencies of the RDD and set the RDD to checkpointed. Then,add a supplementary dependency and set the parent RDD asCheckpointRDD
. ThecheckpointRDD
will be used in the future to read checkpoint files from file system and then generate RDD partitions
What's interesting is the following:
TwoRDD
s are checkpointed in driver program, but only theresult
(see code below) is successfully checkpointed. Not sure whether it is a bug or only that the downstream RDD will be intentionally checkpointed.
val
data1
=
Array
[(
Int
,
Char
)]((
1
,
'a'
), (
2
,
'b'
), (
3
,
'c'
),
(
4
,
'd'
), (
5
,
'e'
), (
3
,
'f'
), (
2
,
'g'
), (
1
,
'h'
))
val
pairs1
=
sc.parallelize(data1,
3
)
val
data2
=
Array
[(
Int
,
Char
)]((
1
,
'A'
), (
2
,
'B'
), (
3
,
'C'
), (
4
,
'D'
))
val
pairs2
=
sc.parallelize(data2,
2
)
pairs2.checkpoint
val
result
=
pairs1.join(pairs2)
result.checkpoint
Q: How to read checkpointed RDD ?
runJob()
will callfinalRDD.partitions()
to determine how many tasks there will be.rdd.partitions()
checks if the RDD has been checkpointed viaRDDCheckpointData
which manages checkpointed RDD. If yes, return the partitions of the RDD (Array[Partition]
). Whenrdd.iterator()
is called to compute RDD's partition,computeOrReadCheckpoint(split: Partition)
is also called to check if the RDD is checkpointed. If yes, the parent RDD'siterator()
, a.k.aCheckpointRDD.iterator()
will be called.CheckpointRDD
reads files on file system to produce RDD partition.That's why a parentCheckpointRDD
is added to checkpointed rdd trickly.
Q: the difference betweencache
andcheckpoint
?
Here is the an answer from Tathagata Das:
There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory (and/or disk). But the lineage(computing chain)of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However,checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely.This allows long lineages to be truncated and the data to be saved reliably in HDFS, which is naturally fault tolerant by replication.
Furthermore,rdd.persist(StorageLevel.DISK_ONLY)
is also different fromcheckpoint
. Through the former can persist RDD partitions to disk, the partitions are managed byblockManager
. Once driver program finishes, which means the thread whereCoarseGrainedExecutorBackend
lies in stops,blockManager
will stop, the RDD cached to disk will be dropped (local files used byblockManager
will be deleted). Butcheckpoint
will persist RDD to HDFS or local directory. If not removed manually, they will always be on disk, so they can be used by the next driver program.
Discussion
When Hadoop MapReduce executes a job, it keeps persisting data (writing to HDFS) at the end of every task and every job. When executing a task, it keeps swapping between memory and disk, back and forth. The problem of Hadoop is that task needs to be re-executed if any error occurs, e.g. shuffle stopped by errors will have only half of the data persisted on disk, and then the persisted data will be recomputed for the next run of shuffle. Spark's advantage is that, when error occurs, the next run will read data from checkpoint, but the downside is that checkpoint needs to execute the job twice.
Example
package
internals
import
org.apache.spark.
SparkContext
import
org.apache.spark.SparkContext.
_
import
org.apache.spark.
SparkConf
object
groupByKeyTest
{
def
main
(
args
:
Array
[
String
]) {
val
conf
=
new
SparkConf
().setAppName(
"
GroupByKey
"
).setMaster(
"
local
"
)
val
sc
=
new
SparkContext
(conf)
sc.setCheckpointDir(
"
/Users/xulijie/Documents/data/checkpoint
"
)
val
data
=
Array
[(
Int
,
Char
)]((
1
,
'a'
), (
2
,
'b'
),
(
3
,
'c'
), (
4
,
'd'
),
(
5
,
'e'
), (
3
,
'f'
),
(
2
,
'g'
), (
1
,
'h'
)
)
val
pairs
=
sc.parallelize(data,
3
)
pairs.checkpoint
pairs.count
val
result
=
pairs.groupByKey(
2
)
result.foreachWith(i
=
>
i)((x, i)
=
>
println(
"
[PartitionIndex
"
+
i
+
"
]
"
+
x))
println(result.toDebugString)
}
}